package onion.mqtt.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import onion.mqtt.server.manager.SessionManager;
import onion.mqtt.server.manager.SubscribeManager;
import onion.mqtt.server.store.SessionStore;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicBoolean;

/**
 * MQTT服务端，netty实现
 *
 * @author Mr, Lu
 * @developmentTeam 浙江允泽信息科技有限公司
 * @createTime 2023/12/12
 */
public class MqttServer {
    static final Logger log = LoggerFactory.getLogger(MqttServer.class);
    private Channel channel;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workGroup;
    private final MqttServerConfig config;
    private final MqttServerBuilder serverBuilder;

    public MqttServer(MqttServerBuilder serverBuilder) {
        this.serverBuilder = serverBuilder;
        this.config = serverBuilder.getConfig();
    }

    /**
     * 发布消息
     *
     * @param topic    主题
     * @param payload  内容
     * @param qos      质量
     * @param retain   是否保留
     * @return 是否成功
     */
    public boolean publish(String topic, byte[] payload, MqttQoS qos, boolean retain) {
        AtomicBoolean result = new AtomicBoolean(false);
        MqttPublishMessage message = MqttMessageBuilders.publish()
                .topicName(topic)
                .qos(qos)
                .retained(retain)
                .build();
        if (ObjectUtils.isNotEmpty(payload)) {
            message.payload().writeBytes(payload);
        }
        SubscribeManager.getInstance().searchSubscribe(topic).forEach(subscribeStore -> {
            if (qos.equals(subscribeStore.getMqttQoS())) {
                SessionStore session = SessionManager.getInstance().getSession(subscribeStore.getClientId());
                if (session != null) {
                    session.getChannel().eventLoop().execute(() -> channel.writeAndFlush(message).addListener(future -> {
                        if (future.isSuccess()) {
                            result.set(true);
                        }
                    }));
                }
            }
        });
        return result.get();
    }

    /**
     * 发布消息
     * @param clientId
     * @param topic
     * @param payload
     * @param qos
     * @param retain
     * @return
     */
    public boolean publish(String clientId, String topic, byte[] payload, MqttQoS qos, boolean retain) {
        AtomicBoolean result = new AtomicBoolean(false);
        MqttPublishMessage message = MqttMessageBuilders.publish()
                .topicName(topic)
                .qos(qos)
                .retained(retain)
                .build();
        if (ObjectUtils.isNotEmpty(payload)) {
            message.payload().writeBytes(payload);
        }
        SubscribeManager.getInstance().searchSubscribe(topic).forEach(subscribeStore -> {
            if (clientId.equals(subscribeStore.getClientId()) && qos.equals(subscribeStore.getMqttQoS())) {
                SessionStore session = SessionManager.getInstance().getSession(clientId);
                if (session != null) {
                    session.getChannel().eventLoop().execute(() -> channel.writeAndFlush(message).addListener(future -> {
                        if (future.isSuccess()) {
                            result.set(true);
                        }
                    }));
                }
            }
        });
        return result.get();
    }

    public void start() {
        bossGroup = new NioEventLoopGroup(config.getBossThread());
        workGroup = new NioEventLoopGroup(config.getWorkThread());

        MqttServerInboundHandler handler = new MqttServerInboundHandler(serverBuilder);
        MqttServerChannelInitializer channelInitializer = new MqttServerChannelInitializer(serverBuilder.getConfig(), handler);

        try {
            ServerBootstrap bs = new ServerBootstrap();
            bs.group(bossGroup, workGroup)
                    // 设置通道模式为非阻塞NioServerSocketChannel
                    .channel(NioServerSocketChannel.class)
                    // 非JVM管理，由OS管理的内存，减少数据传输的一次Copy
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    // SO_REUSEADDR允许在同一端口上启动同一服务器的多个实例，只要每个实例捆绑一个不同的ip地址即可
                    .childOption(ChannelOption.SO_REUSEADDR, true)
                    // SO_KEEPALIVE启用心跳保活机制，当客户端没有向服务端发送任何数据时，服务端将会关闭与客户端的连接，缺省为2小时
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    // SO_BACKLOG指定了当服务器请求处理线程全满时，用于临时存放已完成三次握手的请求的队列的大小，如果未设置，则默认使用1024
                    // 该值在3.9之前默认为511，从3.9开始默认值为1024
                    .childOption(ChannelOption.SO_BACKLOG, 1024)
                    // TCP_NODELAY启用或禁用Nagle算法，缺省为true，允许小包的发送，减少网络交互，但会增加延迟
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childHandler(channelInitializer);
            if (StringUtils.isNoneBlank(config.getHost())) {
                channel = bs.bind(config.getHost(), config.getPort()).sync().channel();
            } else {
                channel = bs.bind(config.getPort()).sync().channel();
            }
        } catch (Exception e) {
            log.warn(e.getMessage());
            this.stop();
        }
    }

    public void stop() {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully();
            bossGroup = null;
        }

        if (workGroup!= null) {
            workGroup.shutdownGracefully();
            workGroup = null;
        }

        if (channel!= null) {
            channel.closeFuture().syncUninterruptibly();
            channel = null;
        }

        log.error("mqtt server stopped");
    }
}
